/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.tests.util.hbase;

import org.apache.flink.tests.util.AutoClosableProcess;
import org.apache.flink.tests.util.CommandLineWrapper;
import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
import org.apache.flink.tests.util.cache.DownloadCache;
import org.apache.flink.util.OperatingSystem;

import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/** {@link HBaseResource} that downloads hbase and set up a local hbase cluster. */
public class LocalStandaloneHBaseResource implements HBaseResource {

    private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneHBaseResource.class);

    private static final int MAX_RETRIES = 3;
    private static final int RETRY_INTERVAL_SECONDS = 30;
    private final TemporaryFolder tmp = new TemporaryFolder();

    private final DownloadCache downloadCache = DownloadCache.get();
    private final String hbaseVersion;
    private Path hbaseDir;

    LocalStandaloneHBaseResource(String hbaseVersion) {
        OperatingSystemRestriction.forbid(
                String.format(
                        "The %s relies on UNIX utils and shell scripts.",
                        getClass().getSimpleName()),
                OperatingSystem.WINDOWS);
        this.hbaseVersion = hbaseVersion;
    }

    private String getHBaseDownloadUrl() {
        return String.format(
                "https://archive.apache.org/dist/hbase/%1$s/hbase-%1$s-bin.tar.gz", hbaseVersion);
    }

    @Override
    public void before() throws Exception {
        tmp.create();
        downloadCache.before();

        this.hbaseDir = tmp.newFolder("hbase-" + hbaseVersion).toPath().toAbsolutePath();
        setupHBaseDist();
        setupHBaseCluster();
    }

    private void setupHBaseDist() throws IOException {
        final Path downloadDirectory = tmp.newFolder("getOrDownload").toPath();
        final Path hbaseArchive =
                downloadCache.getOrDownload(getHBaseDownloadUrl(), downloadDirectory);

        LOG.info("HBase location: {}", hbaseDir.toAbsolutePath());
        AutoClosableProcess.runBlocking(
                CommandLineWrapper.tar(hbaseArchive)
                        .extract()
                        .zipped()
                        .strip(1)
                        .targetDir(hbaseDir)
                        .build());

        LOG.info("Configure {} as hbase.tmp.dir", hbaseDir.toAbsolutePath());
        final String tmpDirConfig =
                "<configuration><property><name>hbase.tmp.dir</name><value>"
                        + hbaseDir
                        + "</value></property></configuration>";
        Files.write(hbaseDir.resolve(Paths.get("conf", "hbase-site.xml")), tmpDirConfig.getBytes());
    }

    private void setupHBaseCluster() throws IOException {
        LOG.info("Starting HBase cluster...");
        runHBaseProcessWithRetry("start-hbase.sh", () -> !isHMasterRunning());
        LOG.info("Start HBase cluster success");
    }

    @Override
    public void afterTestSuccess() {
        shutdownResource();
        downloadCache.afterTestSuccess();
        tmp.delete();
    }

    private void shutdownResource() {
        LOG.info("Stopping HBase Cluster...");
        try {
            runHBaseProcessWithRetry("stop-hbase.sh", () -> isHMasterAlive());
        } catch (IOException ioe) {
            LOG.warn("Error when shutting down HBase Cluster.", ioe);
        }
        LOG.info("Stop HBase Cluster success");
    }

    private void runHBaseProcessWithRetry(String command, Supplier<Boolean> processStatusChecker)
            throws IOException {
        LOG.info("Execute {} for HBase Cluster", command);

        for (int i = 1; i <= MAX_RETRIES; i++) {
            try {
                AutoClosableProcess.runBlocking(
                        hbaseDir.resolve(Paths.get("bin", command)).toString());
            } catch (IOException ioe) {
                LOG.warn("Get exception when execute {} ", command, ioe);
            }

            int waitSecond = 0;
            while (processStatusChecker.get()) {
                try {
                    LOG.info("Waiting for HBase {} works", command);
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    LOG.warn("sleep interrupted", e);
                }
                waitSecond++;
                if (waitSecond > RETRY_INTERVAL_SECONDS) {
                    break;
                }
            }

            if (waitSecond < RETRY_INTERVAL_SECONDS) {
                break;
            } else {
                if (i == MAX_RETRIES) {
                    LOG.error("Execute {} failed, retry times {}", command, i);
                    throw new IllegalArgumentException(
                            String.format("Execute %s failed aftert retry %s times", command, i));
                } else {
                    LOG.warn("Execute {} failed, retry times {}", command, i);
                }
            }
        }
    }

    private boolean isHMasterRunning() {
        try {
            final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false);
            queryHBaseStatus(
                    line ->
                            atomicHMasterStarted.compareAndSet(
                                    false, line.contains("hbase:namespace")));
            return atomicHMasterStarted.get();
        } catch (IOException ioe) {
            return false;
        }
    }

    private void queryHBaseStatus(final Consumer<String> stdoutProcessor) throws IOException {
        executeHBaseShell("scan 'hbase:meta'", stdoutProcessor);
    }

    private boolean isHMasterAlive() {
        try {
            final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false);
            queryHBaseProcess(
                    line -> atomicHMasterStarted.compareAndSet(false, line.contains("HMaster")));
            return atomicHMasterStarted.get();
        } catch (IOException ioe) {
            return false;
        }
    }

    private void queryHBaseProcess(final Consumer<String> stdoutProcessor) throws IOException {
        AutoClosableProcess.create("jps").setStdoutProcessor(stdoutProcessor).runBlocking();
    }

    @Override
    public void createTable(String tableName, String... columnFamilies) throws IOException {
        final String createTable =
                String.format("create '%s',", tableName)
                        + Arrays.stream(columnFamilies)
                                .map(cf -> String.format("{NAME=>'%s'}", cf))
                                .collect(Collectors.joining(","));

        executeHBaseShell(createTable);
    }

    @Override
    public List<String> scanTable(String tableName) throws IOException {
        final List<String> result = new ArrayList<>();
        executeHBaseShell(
                String.format("scan '%s'", tableName),
                line -> {
                    if (line.contains("value=")) {
                        result.add(line);
                    }
                });
        return result;
    }

    @Override
    public void putData(
            String tableName,
            String rowKey,
            String columnFamily,
            String columnQualifier,
            String value)
            throws IOException {
        executeHBaseShell(
                String.format(
                        "put '%s','%s','%s:%s','%s'",
                        tableName, rowKey, columnFamily, columnQualifier, value));
    }

    private void executeHBaseShell(String cmd) throws IOException {
        executeHBaseShell(cmd, line -> {});
    }

    private void executeHBaseShell(String cmd, Consumer<String> stdoutProcessor)
            throws IOException {
        AutoClosableProcess.create(hbaseDir.resolve(Paths.get("bin", "hbase")).toString(), "shell")
                .setStdoutProcessor(stdoutProcessor)
                .setStdInputs(cmd)
                .runBlocking();
    }
}
